﻿#include <fcntl.h>
#include <string>
#include <assert.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include "net.h"
#include "logging.h"
#include "KCPHandler.h"

using namespace std;
static inline void itimeofday(long *sec, long *usec)
{

	struct timeval time;
	gettimeofday(&time, NULL);
	if (sec) *sec = time.tv_sec;
	if (usec) *usec = time.tv_usec;

}

/* get clock in millisecond 64 */
static inline IINT64 iclock64(void)
{
	long s, u;
	IINT64 value;
	itimeofday(&s, &u);
	value = ((IINT64)s) * 1000 + (u / 1000);
	return value;
}

static inline IUINT32 iclock()
{
	return (IUINT32)(iclock64() & 0xfffffffful);
}

/* sleep in millisecond */
static inline void isleep(unsigned long millisecond)
{
	struct timespec ts;
	ts.tv_sec = (time_t)(millisecond / 1000);
	ts.tv_nsec = (long)((millisecond % 1000) * 1000000);
	/*nanosleep(&ts, NULL);*/
	usleep((millisecond << 10) - (millisecond << 4) - (millisecond << 3));
}

static int udp_output(const char *buf, int len, ikcpcb *kcp, void *user)
{
	union { int id; void *ptr; } parameter;
	parameter.ptr = user;
	((KCPHandler*)user)->InputKcpPackage(buf, len);
	return 0;
}
static void write_log(const char *log, struct IKCPCB *kcp, void *user)
{
	printf("%s", log);	
}


KCPHandler::KCPHandler() :EventHandler()
{
	_handle = socket(AF_INET, SOCK_DGRAM, 0);
	fatalif(_handle < 0, "socket failed %d %s", errno, strerror(errno));
	net::setNonBlock(_handle);
	int t = util::addFdFlag(_handle, FD_CLOEXEC);
	fatalif(t, "addFdFlag FD_CLOEXEC failed %d %s", t, strerror(t));
	
	_kcp = ikcp_create(0x11223344, (void*)this);
	_kcp->output = udp_output;
	_kcp->writelog = write_log;
	ikcp_wndsize(_kcp, 128, 128);
	ikcp_nodelay(_kcp, 1, 10, 2, 1);
	_kcp->rx_minrto = 10;
	_kcp->fastresend = 1;	
	_inbuf.makeRoom(); 
	_CheckKcpInfo();
}
void KCPHandler::_CheckKcpInfo()
{
	heap_timer* task = new heap_timer(1000);
	task->cb_func = [=](SUsedataPtr){
		printf(" snd_wnd %u rcv_wnd %u\n rmt_wnd %u cwnd %u\n nrcv_buf %u nrcv_que %u\n nsnd_buf %u nsnd_que %u revSize %d\n ",
			_kcp->snd_wnd, _kcp->rcv_wnd, _kcp->rmt_wnd, _kcp->cwnd, _kcp->nrcv_buf, _kcp->nrcv_que, _kcp->nsnd_buf, _kcp->nsnd_que,_cnt);
		_CheckKcpInfo();
	};
	g_reactor.RegisterTimerTask(task);
}

KCPHandler::~KCPHandler()
{

	close(_handle);
	ikcp_release(_kcp);
}
void KCPHandler::OnMsgCall(CodecBase* codec, const MsgCallBackWithPeer& cb)
{
	_msgCallBack = cb;
	assert(!_readCb);
	_codec.reset(codec);
	OnRead(
		[=](std::shared_ptr<EventHandler>handler)
	{		
		Slice msg;
		Slice msgbuf(_inbuf.data(), _inbuf.size());
		int r = std::dynamic_pointer_cast<KCPHandler>(handler)->_codec->tryDecode(msgbuf, msg);//从字节流中取出一条消息			
		if (r > 0)
		{
			_msgCallBack(shared_from_this(), msg.toString().c_str(), msg.size(), _peer.ip().c_str(), _peer.port());
			_inbuf.consume(r);
		}
		else{		
			_inbuf.consume(_inbuf.size());
		}
	}
	);
}
void KCPHandler::HandleRead()
{
	int r = _DyRev();
	if (r > 0)
	{				
		_inbuf.makeRoom();
		int revSize=0;
		while (true)
		{
			int r = ikcp_recv(_kcp, _inbuf.end(), _inbuf.space());
			if (r >= 0)
				revSize += r;
			else
				break;
		}		
		if (revSize > 0)
		{
			_cnt += revSize;
			_inbuf.addSize(revSize);
			_readCb(shared_from_this());
		}
		else{
			g_reactor.RegisterHandler(shared_from_this(), reactor::kReadEvent);
		}
	}
	else
	{
		if (-1 == r && (errno == EAGAIN || errno == EWOULDBLOCK))
			g_reactor.RegisterHandler(shared_from_this(), reactor::kWriteEvent);	
		else{
			g_reactor.RegisterHandler(shared_from_this(), reactor::kReadEvent);
		}
	}
}

ssize_t KCPHandler::_DySend()
{
	size_t sended = 0;
	socklen_t len = sizeof(struct sockaddr_in);
	while (_outbuf.size() > sended) {
		ssize_t wd = ::sendto(_handle, _outbuf.data() + sended, _outbuf.size() - sended, 0, (struct sockaddr *)&_peer.getAddr(), len);		
		if (wd > 0) {
			sended += wd;
			continue;
		}
		else if (wd == -1 && errno == EINTR) {
			continue;
		}
		else if (wd == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
			break;
		}
		else {
			break;
		}
	}
	if (sended > 0)
		_outbuf.consume(sended);		
	return sended;
}
ssize_t KCPHandler::_DyRev()
{
	int r = 0;
	const int kbufsize = 1024 * 1024;
	char szbuf[1024 * 1024];
	memset(szbuf, 0, kbufsize);
	struct sockaddr_in cli_addr;
	socklen_t len = sizeof(struct sockaddr_in);
	int datasize = 0;
	
	while (_handle > 0) {		
		int rd = 0;
		rd = recvfrom(_handle, szbuf, kbufsize, 0, (struct sockaddr *)&cli_addr, &len);
		if (rd == -1 && errno == EINTR) { //fd中还有数据			
			continue;
		}
		else if (rd == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {//fd中的数据读完			
			break;
		}
		else if (rd == 0 || rd == -1) {
			r = rd;
			break;
		}
		else { //rd > 0			
			r += rd;			
			datasize += rd;
		}
	}
	if (datasize > 0)
	{
		if (ikcp_input(_kcp, szbuf, datasize) != 0)
		{
			trace("分片不合法");
		}
	}			
	_peer = Ip4Addr(cli_addr);
	return datasize;
}



void KCPHandler::HandleWrite()
{		
	int len = _DySend();
	if (len > 0)
	{
		//trace("send response to client, fd=%d\n", (int)_handle);
	}	
	g_reactor.RegisterHandler(shared_from_this(), reactor::kReadEvent);
}
bool KCPHandler::Connect(const char* ip, unsigned short nport)
{

	Ip4Addr addr(string(ip), nport);
	/*int r = ::connect(_handle, (sockaddr *)&addr.getAddr(), sizeof(sockaddr_in));
	if (r != 0 && errno != EINPROGRESS) {
	error("connect to %s error %d %s", addr.toString().c_str(), errno, strerror(errno));
	return false;
	}	*/
	_peer = addr;
	return true;
}


void KCPHandler::InputKcpPackage(const char* szByte, int len)
{
	_outbuf.makeRoom(); _outbuf.append(szByte, len);
	g_reactor.RegisterHandler(shared_from_this(), reactor::kWriteEvent);
}
bool KCPHandler::Send(const char* szByte, int len, const char* ip, unsigned int port){
	if (!Connect(ip, port)){
		return false;
	}
	ikcp_send(_kcp, szByte, len);	
	_peer = Ip4Addr(ip, port);
	g_reactor.RegisterHandler(shared_from_this(), reactor::kWriteEvent);
	return true;
}
bool KCPHandler::Bind(const char* ip, short port, bool reusePort) {
	struct sockaddr_in addr;
	addr.sin_family = AF_INET;
	addr.sin_port = htons(port);
	addr.sin_addr.s_addr = inet_addr(ip);
	if (::bind(_handle, (struct sockaddr *)&addr, sizeof(addr)) < 0)
	{
		error("Bind %s %d failed\n",ip,port);
		return false;
	}
	return true;
}

void KCPHandler::StartUdpateTimer()
{
	heap_timer* reconntask = new heap_timer(5);	
	reconntask->cb_func = [=](SUsedataPtr user_data)
	{								
		ikcp_update(_kcp, iclock());
		StartUdpateTimer();
	};		
	int r = g_reactor.RegisterTimerTask(reconntask);
	errorif(r != 0, "register time task false\n");
}

